home
***
CD-ROM
|
disk
|
FTP
|
other
***
search
/
Personal Computer World 2009 February
/
PCWFEB09.iso
/
Software
/
Resources
/
Chat & Communication
/
Digsby build 37
/
digsby_setup.exe
/
lib
/
ZSI
/
resolvers.pyo
(
.txt
)
< prev
next >
Wrap
Python Compiled Bytecode
|
2008-10-13
|
5KB
|
167 lines
# Source Generated with Decompyle++
# File: in.pyo (Python 2.5)
from ZSI import _copyright, _child_elements, EvaluateException, TC
import multifile
import mimetools
import urllib
from base64 import decodestring as b64decode
import cStringIO as StringIO
def Opaque(uri, tc, ps, **keywords):
source = urllib.urlopen(uri, **keywords)
enc = source.info().getencoding()
if enc in ('7bit', '8bit', 'binary'):
return source.read()
data = StringIO.StringIO()
mimetools.decode(source, data, enc)
return data.getvalue()
def XML(uri, tc, ps, **keywords):
source = urllib.urlopen(uri, **keywords)
enc = source.info().getencoding()
if enc in ('7bit', '8bit', 'binary'):
data = source
else:
data = StringIO.StringIO()
mimetools.decode(source, data, enc)
data.seek(0)
dom = ps.readerclass().fromStream(data)
return _child_elements(dom)[0]
class NetworkResolver:
def __init__(self, prefix = None):
if not prefix:
pass
self.allowed = []
def _check_allowed(self, uri):
for a in self.allowed:
if uri.startswith(a):
return None
continue
raise EvaluateException('Disallowed URI prefix')
def Opaque(self, uri, tc, ps, **keywords):
self._check_allowed(uri)
return Opaque(uri, tc, ps, **keywords)
def XML(self, uri, tc, ps, **keywords):
self._check_allowed(uri)
return XML(uri, tc, ps, **keywords)
def Resolve(self, uri, tc, ps, **keywords):
if isinstance(tc, TC.XML):
return XML(uri, tc, ps, **keywords)
return Opaque(uri, tc, ps, **keywords)
class MIMEResolver:
def __init__(self, ct, f, next = None, uribase = 'thismessage:/', seekable = 0, **kw):
for param in ct.split(';'):
a = param.strip()
if a.startswith('boundary='):
if a[9] in ('"', "'"):
boundary = a[10:-1]
else:
boundary = a[9:]
break
continue
else:
raise ValueError('boundary parameter not found')
self.id_dict = { }
self.loc_dict = { }
self.parts = []
self.next = next
self.base = uribase
mf = multifile.MultiFile(f, seekable)
mf.push(boundary)
while mf.next():
head = mimetools.Message(mf)
body = StringIO.StringIO()
mimetools.decode(mf, body, head.getencoding())
body.seek(0)
part = (head, body)
self.parts.append(part)
key = head.get('content-id')
if key:
if key[0] == '<' and key[-1] == '>':
key = key[1:-1]
self.id_dict[key] = part
key = head.get('content-location')
if key:
self.loc_dict[key] = part
continue
mf.pop()
def GetSOAPPart(self):
(head, part) = self.parts[0]
return StringIO.StringIO(part.getvalue())
def get(self, uri):
if uri.startswith('cid:'):
(head, part) = self.id_dict[uri[4:]]
return StringIO.StringIO(part.getvalue())
if self.loc_dict.has_key(uri):
(head, part) = self.loc_dict[uri]
return StringIO.StringIO(part.getvalue())
def Opaque(self, uri, tc, ps, **keywords):
content = self.get(uri)
if content:
return content.getvalue()
if not self.next:
raise EvaluateException('Unresolvable URI ' + uri)
return self.next.Opaque(uri, tc, ps, **keywords)
def XML(self, uri, tc, ps, **keywords):
content = self.get(uri)
if content:
dom = ps.readerclass().fromStream(content)
return _child_elements(dom)[0]
if not self.next:
raise EvaluateException('Unresolvable URI ' + uri)
return self.next.XML(uri, tc, ps, **keywords)
def Resolve(self, uri, tc, ps, **keywords):
if isinstance(tc, TC.XML):
return self.XML(uri, tc, ps, **keywords)
return self.Opaque(uri, tc, ps, **keywords)
def __getitem__(self, cid):
(head, body) = self.id_dict[cid]
newio = StringIO.StringIO(body.getvalue())
return newio
if __name__ == '__main__':
print _copyright